package cn.cup.dmp.report

import java.util.Properties

import cn.cup.dmp.beans.RptAreaDistributeResult
import cn.cup.dmp.config.ConfigHelper
import cn.cup.dmp.utils.RptKpiHandler
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}

/**
  * 求地域分布情况，需要按省市进行分组
  */

object User_Area_Distribute_Analysis {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("DMP平台-地域报表").setMaster("local[*]")
    conf.set("spark.serializer", ConfigHelper.ser)
    //设置序列化方式
    val sc = new SparkContext(conf)
    val sql = new SQLContext(sc)

    //读取parquet文件
    val dataFrame: DataFrame = sql.read.parquet(ConfigHelper.destPath)

    val value = dataFrame.map(row => {
      val res: List[Double] = RptKpiHandler.rptKpi(row)
      //获得省市名称
      val province = row.getAs[String]("provincename")
      val city = row.getAs[String]("cityname")
      //((省，市)，List（原始请求书，有效请求数，满足广告请求数，参与竞价数。成功竞价数。每次消费金额，每次成本，展示了，点击量）)
      ((province, city), res)
    })
    val resultData: RDD[((String, String), List[Double])] = value.reduceByKey((list1, list2) => {
      val tuples = list1.zip(list2)
      tuples.map(t => t._1 + t._2)
    })

    import sql.implicits._
    val frame: DataFrame = resultData.map(t =>RptAreaDistributeResult(t._1._1,t._1._2,t._2(0).toInt,
      t._2(1).toInt,t._2(2).toInt,t._2(3).toInt,t._2(4).toInt,t._2(5),t._2(6),t._2(7).toInt,t._2(8).toInt)).toDF()


    //Sql写入数据库
    val connetProperties = new Properties()
    connetProperties.setProperty("user",ConfigHelper.username)
    connetProperties.setProperty("password",ConfigHelper.password)
    frame.write.mode("overwrite").jdbc(ConfigHelper.url,ConfigHelper.area_Distribute_table,connetProperties)

    sc.stop()

  }


}
